#==============================================================================#
# 4-family-ids.R
# Matt Curtis Aug 25 2022
# mjdcurtis@gmail.com
#------------------------------------------------------------------------------#
# convert unions data with multiple observations per individual
# to multiple union variables per individual
# edited Feb 2 2023
#==============================================================================#

rm(list = ls())
gc()
library(arrow)
library(tidytable)

#------------------------------------------------------------------------------#

rm(list = ls())
gc()

setwd("~/Replication package")

#------------------------------------------------------------------------------#

unions<-  read_parquet("./2_scripts/2_0_tempfiles/fr-union6.parquet",
                       as_data_frame=FALSE) %>% 
  dplyr::select(union_id) %>% 
  dplyr::compute()
gc()



#------------------------------------------------------------------------------#
# load union details


chunk_n<-0
max_chunk<-10
first<-TRUE
while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_union_details/part-",chunk_n,".parquet")
  geni<-read_parquet(fname,
                     as_data_frame=FALSE) %>% 
    dplyr::select(
      union_id,
      marriage_range,
      marriage_start_day,
      marriage_start_month,
      marriage_start_year,
      marriage_start_circa,
      marriage_city,
      marriage_state,
      marriage_county,
      marriage_country,
      marriage_place_name) %>% 
    dplyr::compute()
  
 
  if(first){
    details<-geni
    first<-FALSE
  }else{
    details<-rbind(details,geni)
  }
  chunk_n<-chunk_n+1
}

gc()
print("Keeping relevant")
details<-details%>% 
  dplyr::right_join(unions)

details<-details %>% 
  dplyr::collect() %>% 
  tidytable() %>% 
  distinct()
gc()

#------------------------------------------------------------------------------#
# marriages - keep 3, flag for more
unions<-  read_parquet("./2_scripts/2_0_tempfiles/fr-union6.parquet") %>% 
  tidytable() %>% 
  distinct()

# link_type=p for partner
# union_type E or U for spouse or ex-spouse, so drop non-married couples
# merge in start year from details
marriages<-filter(unions,link_type=="p")%>% 
  filter(union_type=="E"|union_type=="U") %>% 
  select(profile_id,union_id,union_type) %>% distinct() %>% 
  left_join(select(details,marriage_start_year,union_id)) %>% 
  distinct()

# make sure each profile_id union_id pair is unique
check<-marriages %>%
  summarize(n=n(),.by=c(profile_id,union_id))
stopifnot(max(check$n)==1)
rm(check)
gc()

# generate marriage numbers
marriages<-marriages%>%
  arrange(profile_id,marriage_start_year,union_id,
          union_type) %>% 
  # number of marriages
  mutate(r = row_number(),m=n(),.by=profile_id) %>% 
  # if any are missing, first might be the NA one!
  mutate(munion_missing_year = max(
    as.numeric(is.na(marriage_start_year))))

# reshape wide
munion<-marriages%>% 
  pivot_wider(id_cols=c(profile_id,m,munion_missing_year),
              values_from=union_id,
              names_from = r,
              names_prefix="munion") %>% 
  # flag if more than 4, then only keep first 3
  mutate(flag_4munion = m>3) %>% 
  select(profile_id,flag_4munion,munion1,munion2,munion3)

# merge in details from first marriage
fmar<-marriages%>% 
  # make wide
  filter(r==1) %>% 
  select(profile_id,union_id) %>% 
  left_join(details) %>% 
  select(profile_id,
         marriage_range,
         marriage_start_day,
         marriage_start_month,
         marriage_start_year,
         marriage_start_circa,
         marriage_city,
         marriage_state,
         marriage_county,
         marriage_country,
         marriage_place_name)

marriages<-munion %>% full_join(fmar)
rm(fmar,munion,details)
gc()
#------------------------------------------------------------------------------#

# parents - keep first, flag for more
parents<-filter(unions,link_type=="c") %>% 
  select(profile_id,union_id) %>% distinct() %>% 
  mutate(r = row_number(),m=n(),.by=profile_id) %>% 
  # reshape wide
  pivot_wider(id_cols=c(profile_id,m),
              values_from=union_id,
              names_from = r,
              names_prefix="punion") %>% 
  # flag if more than 2
  mutate(flag_2punion = m>1) %>% 
  # keep first
  select(profile_id,flag_2punion,punion = punion1)

#------------------------------------------------------------------------------#
# merge
unions<-unions %>% 
  select(profile_id) %>% 
  distinct() %>% 
  left_join(parents) %>% 
  left_join(marriages) 

rm(parents,marriages)
gc()
#------------------------------------------------------------------------------#
# now construct some useful variables

# married
unions<-unions %>% 
  mutate(has_spouse = !(
    munion1==""&munion2==""&munion3=="") %>% as.numeric)


#------------------------------------------------------------------------------#
# geni quality flag
# four gen flag


# generation link
# we want to link children to parents
# u is current union, p is parent union
genlink1<-unions %>% 
  select(p = punion,
         m = munion1,
         i = profile_id) %>%
  distinct() 

genlink2<-unions %>% 
  select(p = punion,
         m = munion2,
         i = profile_id) %>%
  distinct() 

genlink3<-unions %>% 
  select(p = punion,
         m = munion3,
         i = profile_id) %>%
  distinct() 

genlink<-bind_rows(genlink1,genlink2,genlink3) %>% distinct()
rm(genlink1,genlink2,genlink3)
gc()


# kids per profile_id
twoplus<-unions %>% 
  select(punion,kid=profile_id) %>%
  distinct() %>% 
  filter(punion!=""&!is.na(punion)) %>% 
  left_join(select(genlink,punion=m,profile_id=i),
            relationship = "many-to-many") %>% 
  summarize(nkids=n(),.by=c(profile_id)) %>% 
  mutate(twoplus = as.numeric(nkids>1)) %>% 
  filter(profile_id!=""&!is.na(profile_id))

# grow a tree from a seed
seed<-select(genlink,i0=i,p1=p) 
genlink<-filter(genlink,m!="")

tree<-seed %>% 
  # link to parents
  left_join(select(genlink,i1=i,p1=m,p2=p),
            relationship = "many-to-many") %>%
  # link to gparents
  left_join(select(genlink,i2=i,p2=m,p3=p),
            relationship = "many-to-many") %>% 
  # link to ggparents
  left_join(select(genlink,i3=i,p3=m,p4=p),
            relationship = "many-to-many") %>% 
  # link to gggparents
  left_join(select(genlink,i4=i,p4=m),
            relationship = "many-to-many")

rm(seed,genlink)
gc()

# merge in the two plus indicator for each profile id in the family tree
# are there ever punions with no related profile_ids for the parents?
# if so, a punion with 2 kids but no profile_ids for the parents does not count
# perhaps that is good?

gen1<-select(tree,i0,i1) %>% distinct %>% 
  left_join(select(twoplus,i1=profile_id,twoplus))

gen2<-select(tree,i0,i2) %>% distinct %>% 
  left_join(select(twoplus,i2=profile_id,twoplus))

gen3<-select(tree,i0,i3) %>% distinct %>% 
  left_join(select(twoplus,i3=profile_id,twoplus))

gen4<-select(tree,i0,i4) %>% distinct %>% 
  left_join(select(twoplus,i4=profile_id,twoplus))

# generate a twoplus_flag variable equal to 1 if any twoplus == 1
# across all relatives in the tree
twoplus_flag<-bind_rows(gen1,gen2,gen3,gen4) %>% 
  summarize(
    twoplus = sum(twoplus,na.rm=TRUE),
    .by=i0
  ) %>% 
  mutate(twoplus_flag = as.numeric(twoplus>0))
rm(gen1,gen2,gen3,gen4)
gc()

unions<-unions %>% 
  left_join(select(twoplus_flag,profile_id=i0,twoplus_flag ))

# check for duplicates
check<-summarize(unions,n=n(),.by=profile_id)
stopifnot(max(check$n)==1)
rm(check,twoplus,twoplus_flag)
gc()

# should be just over 60%
print(summary(unions$twoplus_flag))

#------------------------------------------------------------------------------#

write_parquet(unions,"./2_scripts/2_0_tempfiles/fr-family-ids.parquet")
#------------------------------------------------------------------------------#